package org.apache.lucene.util;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;

/** Represents a logical byte[] as a series of pages.  You
 *  can write-once into the logical byte[] (append only),
 *  using copy, and then retrieve slices (BytesRef) into it
 *  using fill.
 *
 * @lucene.internal
 **/
public final class PagedBytes {
    private final List<byte[]> blocks = new ArrayList<byte[]>();
    private final List<Integer> blockEnd = new ArrayList<Integer>();
    private final int blockSize;
    private final int blockBits;
    private final int blockMask;
    private boolean didSkipBytes;
    private boolean frozen;
    private int upto;
    private byte[] currentBlock;

    private static final byte[] EMPTY_BYTES = new byte[0];

    public final static class Reader implements Closeable {
        private final byte[][] blocks;
        private final int[] blockEnds;
        private final int blockBits;
        private final int blockMask;
        private final int blockSize;
        private final CloseableThreadLocal<byte[]> threadBuffers = new CloseableThreadLocal<byte[]>();

        public Reader(PagedBytes pagedBytes) {
            blocks = new byte[pagedBytes.blocks.size()][];
            for (int i = 0; i < blocks.length; i++) {
                blocks[i] = pagedBytes.blocks.get(i);
            }
            blockEnds = new int[blocks.length];
            for (int i = 0; i < blockEnds.length; i++) {
                blockEnds[i] = pagedBytes.blockEnd.get(i);
            }
            blockBits = pagedBytes.blockBits;
            blockMask = pagedBytes.blockMask;
            blockSize = pagedBytes.blockSize;
        }

        /**
         * Gets a slice out of {@link PagedBytes} starting at <i>start</i> with a
         * given length. Iff the slice spans across a block border this method will
         * allocate sufficient resources and copy the paged data.
         * <p>
         * Slices spanning more than one block are not supported.
         * </p>
         * @lucene.internal 
         **/
        public BytesRef fillSlice(BytesRef b, long start, int length) {
            assert length >= 0 : "length=" + length;
            final int index = (int) (start >> blockBits);
            final int offset = (int) (start & blockMask);
            b.length = length;
            if (blockSize - offset >= length) {
                // Within block
                b.bytes = blocks[index];
                b.offset = offset;
            } else {
                // Split
                byte[] buffer = threadBuffers.get();
                if (buffer == null) {
                    buffer = new byte[length];
                    threadBuffers.set(buffer);
                } else if (buffer.length < length) {
                    buffer = ArrayUtil.grow(buffer, length);
                    threadBuffers.set(buffer);
                }
                b.bytes = buffer;
                b.offset = 0;
                System.arraycopy(blocks[index], offset, buffer, 0, blockSize - offset);
                System.arraycopy(blocks[1 + index], 0, buffer, blockSize - offset, length - (blockSize - offset));
            }
            return b;
        }

        /**
         * Reads length as 1 or 2 byte vInt prefix, starting at <i>start</i>.
         * <p>
         * <b>Note:</b> this method does not support slices spanning across block
         * borders.
         * </p>
         * 
         * @return the given {@link BytesRef}
         * 
         * @lucene.internal
         **/
        public BytesRef fill(BytesRef b, long start) {
            final int index = (int) (start >> blockBits);
            final int offset = (int) (start & blockMask);
            final byte[] block = b.bytes = blocks[index];

            if ((block[offset] & 128) == 0) {
                b.length = block[offset];
                b.offset = offset + 1;
            } else {
                b.length = ((block[offset] & 0x7f) << 8) | (block[1 + offset] & 0xff);
                b.offset = offset + 2;
                assert b.length > 0;
            }
            return b;
        }

        /**
         * Reads length as 1 or 2 byte vInt prefix, starting at <i>start</i>. *
         * <p>
         * <b>Note:</b> this method does not support slices spanning across block
         * borders.
         * </p>
         * 
         * @return the internal block number of the slice.
         * @lucene.internal
         **/
        public int fillAndGetIndex(BytesRef b, long start) {
            final int index = (int) (start >> blockBits);
            final int offset = (int) (start & blockMask);
            final byte[] block = b.bytes = blocks[index];

            if ((block[offset] & 128) == 0) {
                b.length = block[offset];
                b.offset = offset + 1;
            } else {
                b.length = ((block[offset] & 0x7f) << 8) | (block[1 + offset] & 0xff);
                b.offset = offset + 2;
                assert b.length > 0;
            }
            return index;
        }

        /**
         * Reads length as 1 or 2 byte vInt prefix, starting at <i>start</i> and
         * returns the start offset of the next part, suitable as start parameter on
         * next call to sequentially read all {@link BytesRef}.
         * 
         * <p>
         * <b>Note:</b> this method does not support slices spanning across block
         * borders.
         * </p>
         * 
         * @return the start offset of the next part, suitable as start parameter on
         *         next call to sequentially read all {@link BytesRef}.
         * @lucene.internal
         **/
        public long fillAndGetStart(BytesRef b, long start) {
            final int index = (int) (start >> blockBits);
            final int offset = (int) (start & blockMask);
            final byte[] block = b.bytes = blocks[index];

            if ((block[offset] & 128) == 0) {
                b.length = block[offset];
                b.offset = offset + 1;
                start += 1L + b.length;
            } else {
                b.length = ((block[offset] & 0x7f) << 8) | (block[1 + offset] & 0xff);
                b.offset = offset + 2;
                start += 2L + b.length;
                assert b.length > 0;
            }
            return start;
        }

        /**
         * Gets a slice out of {@link PagedBytes} starting at <i>start</i>, the
         * length is read as 1 or 2 byte vInt prefix. Iff the slice spans across a
         * block border this method will allocate sufficient resources and copy the
         * paged data.
         * <p>
         * Slices spanning more than one block are not supported.
         * </p>
         * 
         * @lucene.internal
         **/
        public BytesRef fillSliceWithPrefix(BytesRef b, long start) {
            final int index = (int) (start >> blockBits);
            int offset = (int) (start & blockMask);
            final byte[] block = blocks[index];
            final int length;
            if ((block[offset] & 128) == 0) {
                length = block[offset];
                offset = offset + 1;
            } else {
                length = ((block[offset] & 0x7f) << 8) | (block[1 + offset] & 0xff);
                offset = offset + 2;
                assert length > 0;
            }
            assert length >= 0 : "length=" + length;
            b.length = length;
            if (blockSize - offset >= length) {
                // Within block
                b.offset = offset;
                b.bytes = blocks[index];
            } else {
                // Split
                byte[] buffer = threadBuffers.get();
                if (buffer == null) {
                    buffer = new byte[length];
                    threadBuffers.set(buffer);
                } else if (buffer.length < length) {
                    buffer = ArrayUtil.grow(buffer, length);
                    threadBuffers.set(buffer);
                }
                b.bytes = buffer;
                b.offset = 0;
                System.arraycopy(blocks[index], offset, buffer, 0, blockSize - offset);
                System.arraycopy(blocks[1 + index], 0, buffer, blockSize - offset, length - (blockSize - offset));
            }
            return b;
        }

        /** @lucene.internal */
        public byte[][] getBlocks() {
            return blocks;
        }

        /** @lucene.internal */
        public int[] getBlockEnds() {
            return blockEnds;
        }

        public void close() {
            threadBuffers.close();
        }
    }

    /** 1<<blockBits must be bigger than biggest single
     *  BytesRef slice that will be pulled */
    public PagedBytes(int blockBits) {
        this.blockSize = 1 << blockBits;
        this.blockBits = blockBits;
        blockMask = blockSize - 1;
        upto = blockSize;
    }

    /** Read this many bytes from in */
    public void copy(IndexInput in, long byteCount) throws IOException {
        while (byteCount > 0) {
            int left = blockSize - upto;
            if (left == 0) {
                if (currentBlock != null) {
                    blocks.add(currentBlock);
                    blockEnd.add(upto);
                }
                currentBlock = new byte[blockSize];
                upto = 0;
                left = blockSize;
            }
            if (left < byteCount) {
                in.readBytes(currentBlock, upto, left, false);
                upto = blockSize;
                byteCount -= left;
            } else {
                in.readBytes(currentBlock, upto, (int) byteCount, false);
                upto += byteCount;
                break;
            }
        }
    }

    /** Copy BytesRef in */
    public void copy(BytesRef bytes) throws IOException {
        int byteCount = bytes.length;
        int bytesUpto = bytes.offset;
        while (byteCount > 0) {
            int left = blockSize - upto;
            if (left == 0) {
                if (currentBlock != null) {
                    blocks.add(currentBlock);
                    blockEnd.add(upto);
                }
                currentBlock = new byte[blockSize];
                upto = 0;
                left = blockSize;
            }
            if (left < byteCount) {
                System.arraycopy(bytes.bytes, bytesUpto, currentBlock, upto, left);
                upto = blockSize;
                byteCount -= left;
                bytesUpto += left;
            } else {
                System.arraycopy(bytes.bytes, bytesUpto, currentBlock, upto, byteCount);
                upto += byteCount;
                break;
            }
        }
    }

    /** Copy BytesRef in, setting BytesRef out to the result.
     * Do not use this if you will use freeze(true).
     * This only supports bytes.length <= blockSize */
    public void copy(BytesRef bytes, BytesRef out) throws IOException {
        int left = blockSize - upto;
        if (bytes.length > left || currentBlock == null) {
            if (currentBlock != null) {
                blocks.add(currentBlock);
                blockEnd.add(upto);
                didSkipBytes = true;
            }
            currentBlock = new byte[blockSize];
            upto = 0;
            left = blockSize;
            assert bytes.length <= blockSize;
            // TODO: we could also support variable block sizes
        }

        out.bytes = currentBlock;
        out.offset = upto;
        out.length = bytes.length;

        System.arraycopy(bytes.bytes, bytes.offset, currentBlock, upto, bytes.length);
        upto += bytes.length;
    }

    /** Commits final byte[], trimming it if necessary and if trim=true */
    public Reader freeze(boolean trim) {
        if (frozen) {
            throw new IllegalStateException("already frozen");
        }
        if (didSkipBytes) {
            throw new IllegalStateException("cannot freeze when copy(BytesRef, BytesRef) was used");
        }
        if (trim && upto < blockSize) {
            final byte[] newBlock = new byte[upto];
            System.arraycopy(currentBlock, 0, newBlock, 0, upto);
            currentBlock = newBlock;
        }
        if (currentBlock == null) {
            currentBlock = EMPTY_BYTES;
        }
        blocks.add(currentBlock);
        blockEnd.add(upto);
        frozen = true;
        currentBlock = null;
        return new Reader(this);
    }

    public long getPointer() {
        if (currentBlock == null) {
            return 0;
        } else {
            return (blocks.size() * ((long) blockSize)) + upto;
        }
    }

    /** Copy bytes in, writing the length as a 1 or 2 byte
     *  vInt prefix. */
    public long copyUsingLengthPrefix(BytesRef bytes) throws IOException {

        if (upto + bytes.length + 2 > blockSize) {
            if (bytes.length + 2 > blockSize) {
                throw new IllegalArgumentException("block size " + blockSize + " is too small to store length " + bytes.length + " bytes");
            }
            if (currentBlock != null) {
                blocks.add(currentBlock);
                blockEnd.add(upto);
            }
            currentBlock = new byte[blockSize];
            upto = 0;
        }

        final long pointer = getPointer();

        if (bytes.length < 128) {
            currentBlock[upto++] = (byte) bytes.length;
        } else {
            currentBlock[upto++] = (byte) (0x80 | (bytes.length >> 8));
            currentBlock[upto++] = (byte) (bytes.length & 0xff);
        }
        System.arraycopy(bytes.bytes, bytes.offset, currentBlock, upto, bytes.length);
        upto += bytes.length;

        return pointer;
    }

    public final class PagedBytesDataInput extends DataInput {
        private int currentBlockIndex;
        private int currentBlockUpto;
        private byte[] currentBlock;

        PagedBytesDataInput() {
            currentBlock = blocks.get(0);
        }

        @Override
        public Object clone() {
            PagedBytesDataInput clone = getDataInput();
            clone.setPosition(getPosition());
            return clone;
        }

        /** Returns the current byte position. */
        public long getPosition() {
            return currentBlockIndex * blockSize + currentBlockUpto;
        }

        /** Seek to a position previously obtained from
         *  {@link #getPosition}. */
        public void setPosition(long pos) {
            currentBlockIndex = (int) (pos >> blockBits);
            currentBlock = blocks.get(currentBlockIndex);
            currentBlockUpto = (int) (pos & blockMask);
        }

        @Override
        public byte readByte() {
            if (currentBlockUpto == blockSize) {
                nextBlock();
            }
            return currentBlock[currentBlockUpto++];
        }

        @Override
        public void readBytes(byte[] b, int offset, int len) {
            assert b.length >= offset + len;
            final int offsetEnd = offset + len;
            while (true) {
                final int blockLeft = blockSize - currentBlockUpto;
                final int left = offsetEnd - offset;
                if (blockLeft < left) {
                    System.arraycopy(currentBlock, currentBlockUpto, b, offset, blockLeft);
                    nextBlock();
                    offset += blockLeft;
                } else {
                    // Last block
                    System.arraycopy(currentBlock, currentBlockUpto, b, offset, left);
                    currentBlockUpto += left;
                    break;
                }
            }
        }

        private void nextBlock() {
            currentBlockIndex++;
            currentBlockUpto = 0;
            currentBlock = blocks.get(currentBlockIndex);
        }
    }

    public final class PagedBytesDataOutput extends DataOutput {
        @Override
        public void writeByte(byte b) {
            if (upto == blockSize) {
                if (currentBlock != null) {
                    blocks.add(currentBlock);
                    blockEnd.add(upto);
                }
                currentBlock = new byte[blockSize];
                upto = 0;
            }
            currentBlock[upto++] = b;
        }

        @Override
        public void writeBytes(byte[] b, int offset, int length) throws IOException {
            assert b.length >= offset + length;
            if (length == 0) {
                return;
            }

            if (upto == blockSize) {
                if (currentBlock != null) {
                    blocks.add(currentBlock);
                    blockEnd.add(upto);
                }
                currentBlock = new byte[blockSize];
                upto = 0;
            }

            final int offsetEnd = offset + length;
            while (true) {
                final int left = offsetEnd - offset;
                final int blockLeft = blockSize - upto;
                if (blockLeft < left) {
                    System.arraycopy(b, offset, currentBlock, upto, blockLeft);
                    blocks.add(currentBlock);
                    blockEnd.add(blockSize);
                    currentBlock = new byte[blockSize];
                    upto = 0;
                    offset += blockLeft;
                } else {
                    // Last block
                    System.arraycopy(b, offset, currentBlock, upto, left);
                    upto += left;
                    break;
                }
            }
        }

        /** Return the current byte position. */
        public long getPosition() {
            if (currentBlock == null) {
                return 0;
            } else {
                return blocks.size() * blockSize + upto;
            }
        }
    }

    /** Returns a DataInput to read values from this
     *  PagedBytes instance. */
    public PagedBytesDataInput getDataInput() {
        if (!frozen) {
            throw new IllegalStateException("must call freeze() before getDataInput");
        }
        return new PagedBytesDataInput();
    }

    /** Returns a DataOutput that you may use to write into
     *  this PagedBytes instance.  If you do this, you should
     *  not call the other writing methods (eg, copy);
     *  results are undefined. */
    public PagedBytesDataOutput getDataOutput() {
        if (frozen) {
            throw new IllegalStateException("cannot get DataOutput after freeze()");
        }
        return new PagedBytesDataOutput();
    }
}
